{
 "metadata": {
  "name": "",
  "signature": "sha256:11079f4265aa0d15e0bf53fe2dd27e64eb926948e4a2f0f43e8e08a276da43f4"
 },
 "nbformat": 3,
 "nbformat_minor": 0,
 "worksheets": [
  {
   "cells": [
    {
     "cell_type": "heading",
     "level": 1,
     "metadata": {},
     "source": [
      "Data aggregations on RDDs"
     ]
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "[Introduction to Spark with Python, by Jose A. Dianes](https://github.com/jadianes/spark-py-notebooks)"
     ]
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "We can aggregate RDD data in Spark by using three different actions: `reduce`, `fold`, and `aggregate`. The last one is the more general one and someway includes the first two.  "
     ]
    },
    {
     "cell_type": "heading",
     "level": 2,
     "metadata": {},
     "source": [
      "Getting the data and creating the RDD"
     ]
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "As we did in our first notebook, we will use the reduced dataset (10 percent) provided for the [KDD Cup 1999](http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html), containing nearly half million nework interactions. The file is provided as a Gzip file that we will download locally.  "
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "import urllib\n",
      "f = urllib.urlretrieve (\"http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz\", \"kddcup.data_10_percent.gz\")"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [],
     "prompt_number": 1
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "data_file = \"./kddcup.data_10_percent.gz\"\n",
      "raw_data = sc.textFile(data_file)"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [],
     "prompt_number": 2
    },
    {
     "cell_type": "heading",
     "level": 2,
     "metadata": {},
     "source": [
      "Inspecting interaction duration by tag"
     ]
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "Both `fold` and `reduce` take a function as an argument that is applied to two elements of the RDD. The `fold` action differs from `reduce` in that it gets and additional initial *zero value* to be used for the initial call. This value should be the identity element for the function provided.  "
     ]
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "As an example, imagine we want to know the total duration of our interactions for normal and attack interactions. We can use `reduce` as follows.    "
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "# parse data\n",
      "csv_data = raw_data.map(lambda x: x.split(\",\"))\n",
      "\n",
      "# separate into different RDDs\n",
      "normal_csv_data = csv_data.filter(lambda x: x[41]==\"normal.\")\n",
      "attack_csv_data = csv_data.filter(lambda x: x[41]!=\"normal.\")"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [],
     "prompt_number": 3
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "The function that we pass to `reduce` gets and returns elements of the same type of the RDD. If we want to sum durations we need to extract that element into a new RDD.  "
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "normal_duration_data = normal_csv_data.map(lambda x: int(x[0]))\n",
      "attack_duration_data = attack_csv_data.map(lambda x: int(x[0]))"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [],
     "prompt_number": 4
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "Now we can reduce these new RDDs.  "
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "total_normal_duration = normal_duration_data.reduce(lambda x, y: x + y)\n",
      "total_attack_duration = attack_duration_data.reduce(lambda x, y: x + y)\n",
      "\n",
      "print \"Total duration for 'normal' interactions is {}\".\\\n",
      "    format(total_normal_duration)\n",
      "print \"Total duration for 'attack' interactions is {}\".\\\n",
      "    format(total_attack_duration)"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [
      {
       "output_type": "stream",
       "stream": "stdout",
       "text": [
        "Total duration for 'normal' interactions is 21075991\n",
        "Total duration for 'attack' interactions is 2626792\n"
       ]
      }
     ],
     "prompt_number": 5
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "We can go further and use counts to calculate duration means.  "
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "normal_count = normal_duration_data.count()\n",
      "attack_count = attack_duration_data.count()\n",
      "\n",
      "print \"Mean duration for 'normal' interactions is {}\".\\\n",
      "    format(round(total_normal_duration/float(normal_count),3))\n",
      "print \"Mean duration for 'attack' interactions is {}\".\\\n",
      "    format(round(total_attack_duration/float(attack_count),3))"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [
      {
       "output_type": "stream",
       "stream": "stdout",
       "text": [
        "Mean duration for 'normal' interactions is 216.657\n",
        "Mean duration for 'attack' interactions is 6.621\n"
       ]
      }
     ],
     "prompt_number": 6
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "We have a first (and too simplistic) approach to identify attack interactions."
     ]
    },
    {
     "cell_type": "heading",
     "level": 2,
     "metadata": {},
     "source": [
      "A better way, using `aggregate`  "
     ]
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "The `aggregate` action frees us from the constraint of having the return be the same type as the RDD we are working on. Like with `fold`, we supply an initial zero value of the type we want to return. Then we provide two functions. The first one is used to combine the elements from our RDD with the accumulator. The second function is needed to merge two accumulators. Let's see it in action calculating the mean we did before.  "
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "normal_sum_count = normal_duration_data.aggregate(\n",
      "    (0,0), # the initial value\n",
      "    (lambda acc, value: (acc[0] + value, acc[1] + 1)), # combine value with acc\n",
      "    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])) # combine accumulators\n",
      ")\n",
      "\n",
      "print \"Mean duration for 'normal' interactions is {}\".\\\n",
      "    format(round(normal_sum_count[0]/float(normal_sum_count[1]),3))"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [
      {
       "output_type": "stream",
       "stream": "stdout",
       "text": [
        "Mean duration for 'normal' interactions is 216.657\n"
       ]
      }
     ],
     "prompt_number": 7
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "In the previous aggregation, the accumulator first element keeps the total sum, while the second element keeps the count. Combining an accumulator with an RDD element consists in summing up the value and incrementing the count. Combining two accumulators requires just a pairwise sum.  "
     ]
    },
    {
     "cell_type": "markdown",
     "metadata": {},
     "source": [
      "We can do the same with attack type interactions.  "
     ]
    },
    {
     "cell_type": "code",
     "collapsed": false,
     "input": [
      "attack_sum_count = attack_duration_data.aggregate(\n",
      "    (0,0), # the initial value\n",
      "    (lambda acc, value: (acc[0] + value, acc[1] + 1)), # combine value with acc\n",
      "    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])) # combine accumulators\n",
      ")\n",
      "\n",
      "print \"Mean duration for 'attack' interactions is {}\".\\\n",
      "    format(round(attack_sum_count[0]/float(attack_sum_count[1]),3))"
     ],
     "language": "python",
     "metadata": {},
     "outputs": [
      {
       "output_type": "stream",
       "stream": "stdout",
       "text": [
        "Mean duration for 'attack' interactions is 6.621\n"
       ]
      }
     ],
     "prompt_number": 8
    }
   ],
   "metadata": {}
  }
 ]
}